/*
 * Copyright (2021) The Delta Lake Project Authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.sql.delta.commands.cdc

import java.sql.Timestamp

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.ClassicColumnConversions._
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.cdc.CDCReader.{cdcReadSchema, DeltaCDFRelation}
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSource, DeltaSQLConf}

import org.apache.spark.internal.MDC
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, Literal}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{BaseRelation, CatalystScan, Filter}
import org.apache.spark.sql.types.{LongType, StringType, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
 * Represents a Delta log version, and how the version is determined.
 * @param version the determined version.
 * @param timestamp the commit timestamp of the determined version. Will be filled when the
 *                  version is determined by timestamp.
 */
case class ResolvedCDFVersion(version: Long, timestamp: Option[Timestamp]) {
  /** Whether this version is resolved by timestamp. */
  def resolvedByTimestamp: Boolean = timestamp.isDefined
}

// A snapshot coupled with a schema mode that user specified
case class SnapshotWithSchemaMode(snapshot: Snapshot, schemaMode: DeltaBatchCDFSchemaMode)

/**
 * A special BaseRelation wrapper for CDF reads.
 */
abstract class DeltaCDFRelationBase(
    snapshotWithSchemaMode: SnapshotWithSchemaMode,
    sqlContext: SQLContext,
    catalogTableOpt: Option[CatalogTable],
    startingVersion: Option[Long],
    endingVersion: Option[Long]) extends BaseRelation with CatalystScan {

  protected val deltaLog: DeltaLog = snapshotWithSchemaMode.snapshot.deltaLog

  protected lazy val latestVersionOfTableDuringAnalysis: Long =
    deltaLog.update(catalogTableOpt = catalogTableOpt).version

  /**
   * There may be a slight divergence here in terms of what schema is in the latest data vs what
   * schema we have captured during analysis, but this is an inherent limitation of Spark.
   *
   * However, if there are schema changes between analysis and execution, since we froze this
   * schema, our schema incompatibility checks will kick in during the scan so we will always
   * be safe - Although it is a notable caveat that user should be aware of because the CDC query
   * may break.
   */
  protected lazy val endingVersionForBatchSchema: Long = endingVersion.map { v =>
    // As defined in the method doc, if ending version is greater than the latest version, we will
    // just use the latest version to find the schema.
    latestVersionOfTableDuringAnalysis min v
  }.getOrElse {
    // Or if endingVersion is not specified, we just use the latest schema.
    latestVersionOfTableDuringAnalysis
  }

  // The final snapshot whose schema is going to be used as this CDF relation's schema
  protected val snapshotForBatchSchema: Snapshot = snapshotWithSchemaMode.schemaMode match {
    case BatchCDFSchemaEndVersion =>
      // Fetch the ending version and its schema
      deltaLog.getSnapshotAt(endingVersionForBatchSchema, catalogTableOpt = catalogTableOpt)
    case _ =>
      // Apply the default, either latest generated by DeltaTableV2 or specified by Time-travel
      // options.
      snapshotWithSchemaMode.snapshot
  }

  override val schema: StructType = {
    cdcReadSchema(
      DeltaTableUtils.removeInternalDeltaMetadata(
        sqlContext.sparkSession,
        DeltaTableUtils.removeInternalWriterMetadata(
          sqlContext.sparkSession, snapshotForBatchSchema.metadata.schema
        )
      )
    )
  }

  override def unhandledFilters(filters: Array[Filter]): Array[Filter] = Array.empty

  protected def constructRDD(
      df: DataFrame,
      requiredColumns: Seq[Attribute],
      filters: Seq[Expression]): RDD[Row] = {
    // Rewrite the attributes in the required columns and
    // pushed down filters to match the output of the internal DataFrame.
    val outputMap = df.queryExecution.analyzed.output.map(a => a.name -> a).toMap
    val projections =
      requiredColumns.map(a => Column(outputMap(a.name)))
    val filter = Column(
      filters
        .map(_.transform { case a: Attribute => outputMap(a.name) })
        .reduceOption(And)
        .getOrElse(Literal.TrueLiteral)
    )

    df.filter(filter).select(projections: _*).rdd
  }
}

/**
 * Base trait for CDC readers that contains common functionality
 * shared across different CDC reader implementations.
 */
trait CDCReaderBase extends DeltaLogging {
  /**
   * Given timestamp or version, this method returns the corresponding version for that timestamp
   * or the version itself, as well as how the return version is obtained: by `version` or
   * `timestamp`.
   */
  private def getVersionForCDC(
      spark: SparkSession,
      deltaLog: DeltaLog,
      catalogTableOpt: Option[CatalogTable],
      conf: SQLConf,
      options: CaseInsensitiveStringMap,
      versionKey: String,
      timestampKey: String): Option[ResolvedCDFVersion] = {
    if (options.containsKey(versionKey)) {
      val version = options.get(versionKey)
      try {
        Some(ResolvedCDFVersion(version.toLong, timestamp = None))
      } catch {
        case _: NumberFormatException => throw DeltaErrors.versionInvalid(version)
      }
    } else if (options.containsKey(timestampKey)) {
      val ts = options.get(timestampKey)
      val spec = DeltaTimeTravelSpec(Some(Literal(ts)), None, Some("cdcReader"))
      val timestamp = spec.getTimestamp(spark.sessionState.conf)
      val allowOutOfRange = conf.getConf(DeltaSQLConf.DELTA_CDF_ALLOW_OUT_OF_RANGE_TIMESTAMP)
      val resolvedVersion = if (timestampKey == DeltaDataSource.CDC_START_TIMESTAMP_KEY) {
        // For the starting timestamp we need to find a version after the provided timestamp
        // we can use the same semantics as streaming.
        DeltaSource.getStartingVersionFromTimestamp(
          spark, deltaLog, catalogTableOpt, timestamp, allowOutOfRange)
      } else {
        // For ending timestamp the version should be before the provided timestamp.
        DeltaTableUtils.resolveTimeTravelVersion(
          conf, deltaLog, catalogTableOpt, spec, allowOutOfRange)._1
      }
      Some(ResolvedCDFVersion(resolvedVersion, Some(timestamp)))
    } else {
      None
    }
  }

  /**
   * Get the batch cdf schema mode for a table, considering whether it has column mapping enabled
   * or not.
   */
  def getBatchSchemaModeForTable(
      spark: SparkSession,
      columnMappingEnabled: Boolean): DeltaBatchCDFSchemaMode = {
    if (columnMappingEnabled) {
      // Tables with column-mapping enabled can specify which schema version to use with this
      // config.
      DeltaBatchCDFSchemaMode(spark.sessionState.conf.getConf(
        DeltaSQLConf.DELTA_CDF_DEFAULT_SCHEMA_MODE_FOR_COLUMN_MAPPING_TABLE))
    } else {
      // Non column-mapping table uses the current default, which is typically `legacy` - usually
      // the latest schema is used, but it can depend on time-travel arguments as well.
      BatchCDFSchemaLegacy
    }
  }

  /**
   * Get a Relation that represents change data between two snapshots of the table.
   *
   * @param spark Spark session
   * @param snapshotToUse Snapshot to use to provide read schema and version
   * @param isTimeTravelQuery Whether this CDC scan is used in conjunction with time-travel args
   * @param conf SQL conf
   * @param options CDC specific options
   */
  def getCDCRelation(
      spark: SparkSession,
      snapshotToUse: Snapshot,
      catalogTableOpt: Option[CatalogTable],
      isTimeTravelQuery: Boolean,
      conf: SQLConf,
      options: CaseInsensitiveStringMap): BaseRelation = {
    val startingVersion = getVersionForCDC(
      spark,
      snapshotToUse.deltaLog,
      catalogTableOpt,
      conf,
      options,
      DeltaDataSource.CDC_START_VERSION_KEY,
      DeltaDataSource.CDC_START_TIMESTAMP_KEY).getOrElse {
      throw DeltaErrors.noStartVersionForCDC()
    }

    val endingVersionOpt = getVersionForCDC(
      spark,
      snapshotToUse.deltaLog,
      catalogTableOpt,
      conf,
      options,
      DeltaDataSource.CDC_END_VERSION_KEY,
      DeltaDataSource.CDC_END_TIMESTAMP_KEY
    )

    verifyStartingVersion(spark, snapshotToUse, catalogTableOpt, conf, startingVersion) match {
      case Some(toReturn) =>
        return toReturn
      case None =>
    }

    verifyEndingVersion(
      spark, snapshotToUse, catalogTableOpt, startingVersion, endingVersionOpt) match {
      case Some(toReturn) =>
        return toReturn
      case None =>
    }

    logInfo(
      log"startingVersion: ${MDC(DeltaLogKeys.START_VERSION, startingVersion.version)}, " +
        log"endingVersion: ${MDC(DeltaLogKeys.END_VERSION, endingVersionOpt.map(_.version))}")

    val startingSnapshot = snapshotToUse.deltaLog.getSnapshotAt(
      startingVersion.version,
      catalogTableOpt = catalogTableOpt,
      enforceTimeTravelWithinDeletedFileRetention = true)
    val columnMappingEnabledAtStartingVersion =
      startingSnapshot.metadata.columnMappingMode != NoMapping

    val columnMappingEnabledAtEndVersion = endingVersionOpt.exists { endingVersion =>
      // End version could be after the snapshot to use version, in which case it might not exist.
      if (endingVersion.version > snapshotToUse.version) {
        false
      } else {
        val endingSnapshot = snapshotToUse.deltaLog.getSnapshotAt(endingVersion.version,
          catalogTableOpt = catalogTableOpt)
        endingSnapshot.metadata.columnMappingMode != NoMapping &&
          endingVersion.version <= snapshotToUse.version
      }
    }

    val columnMappingEnabledAtSnapshotToUseVersion =
      snapshotToUse.metadata.columnMappingMode != NoMapping

    // Special handling for tables with column mapping mode enabled in any of the versions.
    val columnMappingEnabled = columnMappingEnabledAtSnapshotToUseVersion ||
      columnMappingEnabledAtEndVersion || columnMappingEnabledAtStartingVersion
    val schemaMode = getBatchSchemaModeForTable(spark, columnMappingEnabled = columnMappingEnabled)

    // Non-legacy schema mode options cannot be used with time-travel because the schema to use
    // will be confusing.
    if (isTimeTravelQuery && schemaMode != BatchCDFSchemaLegacy) {
      throw DeltaErrors.illegalDeltaOptionException(
        DeltaSQLConf.DELTA_CDF_DEFAULT_SCHEMA_MODE_FOR_COLUMN_MAPPING_TABLE.key,
        schemaMode.name,
        s"${DeltaSQLConf.DELTA_CDF_DEFAULT_SCHEMA_MODE_FOR_COLUMN_MAPPING_TABLE.key} " +
          s"cannot be used with time travel options.")
    }

    getConstructedCDCRelation(
      SnapshotWithSchemaMode(snapshotToUse, schemaMode),
      spark.sqlContext,
      catalogTableOpt,
      Some(startingVersion.version),
      endingVersionOpt.map(_.version)
    )
  }

  private def verifyStartingVersion(
      spark: SparkSession,
      snapshotToUse: Snapshot,
      catalogTableOpt: Option[CatalogTable],
      conf: SQLConf,
      startingVersion: ResolvedCDFVersion): Option[BaseRelation] = {
    // add a version check here that is cheap instead of after trying to list a large version
    // that doesn't exist
    if (startingVersion.version > snapshotToUse.version) {
      val allowOutOfRange = conf.getConf(DeltaSQLConf.DELTA_CDF_ALLOW_OUT_OF_RANGE_TIMESTAMP)
      if (allowOutOfRange) {
        return Some(emptyCDFRelation(spark, snapshotToUse, catalogTableOpt, BatchCDFSchemaLegacy))
      }
      throw DeltaErrors.startVersionAfterLatestVersion(
        startingVersion.version, snapshotToUse.version)
    }
    None
  }

  private def verifyEndingVersion(
      spark: SparkSession,
      snapshotToUse: Snapshot,
      catalogTableOpt: Option[CatalogTable],
      startingVersion: ResolvedCDFVersion,
      endingVersionOpt: Option[ResolvedCDFVersion]): Option[BaseRelation] = {
    // Given two timestamps, there is a case when both of them lay closely between two versions:
    // version:          4                                                 5
    //          ---------|-------------------------------------------------|--------
    //                           ^ start timestamp        ^ end timestamp
    // In this case the starting version will be 5 and ending version will be 4. We must not
    // throw `endBeforeStartVersionInCDC` but return empty result.
    endingVersionOpt.foreach { endingVersion =>
      if (startingVersion.resolvedByTimestamp && endingVersion.resolvedByTimestamp) {
        // The next `if` is true when end is less than start but no commit is in between.
        // We need to capture such a case and throw early.
        if (startingVersion.timestamp.get.after(endingVersion.timestamp.get)) {
          throw DeltaErrors.endBeforeStartVersionInCDC(
            startingVersion.version,
            endingVersion.version)
        }
        if (endingVersion.version == startingVersion.version - 1) {
          return Some(emptyCDFRelation(spark, snapshotToUse, catalogTableOpt, BatchCDFSchemaLegacy))
        }
      }
      if (endingVersionOpt.exists(_.version < startingVersion.version)) {
        throw DeltaErrors.endBeforeStartVersionInCDC(
          startingVersion.version,
          endingVersionOpt.get.version)
      }
    }
    None
  }

  private def emptyCDFRelation(
      spark: SparkSession,
      snapshot: Snapshot,
      catalogTableOpt: Option[CatalogTable],
      schemaMode: DeltaBatchCDFSchemaMode) = {
    new DeltaCDFRelation(
      SnapshotWithSchemaMode(snapshot, schemaMode),
      spark.sqlContext,
      catalogTableOpt,
      startingVersion = None,
      endingVersion = None) {
      override def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] =
        sqlContext.sparkSession.sparkContext.emptyRDD[Row]
    }
  }

  /**
   * Append CDC metadata columns to the provided schema.
   */
  def cdcReadSchema(deltaSchema: StructType): StructType = {
    deltaSchema
      .add(CDCReader.CDC_TYPE_COLUMN_NAME, StringType)
      .add(CDCReader.CDC_COMMIT_VERSION, LongType)
      .add(CDCReader.CDC_COMMIT_TIMESTAMP, TimestampType)
  }

  /**
   * Check metadata (which may contain schema change)'s read compatibility with read schema.
   */
  protected def checkBatchCdfReadSchemaIncompatibility(
      readSchemaSnapshot: SnapshotDescriptor,
      start: Long,
      end: Long,
      shouldCheckSchemaToBlockBatchRead: Boolean,
      metadata: Metadata,
      metadataVer: Long,
      isSchemaChange: Boolean): Unit = {
    // We do not check for any incompatibility if the global "I don't care" flag is turned on
    if (shouldCheckSchemaToBlockBatchRead) {
      // Column mapping incompatibilities
      val compatible = {
        // For column mapping schema change, the order matters because we don't want to treat
        // an ADD COLUMN as an inverse DROP COLUMN.
        if (metadataVer <= readSchemaSnapshot.version) {
          DeltaColumnMapping.hasNoColumnMappingSchemaChanges(
            newMetadata = readSchemaSnapshot.metadata, oldMetadata = metadata)
        } else {
          DeltaColumnMapping.hasNoColumnMappingSchemaChanges(
            newMetadata = metadata, oldMetadata = readSchemaSnapshot.metadata)
        }
      } && {
        // Other standard read incompatibilities
        if (metadataVer <= readSchemaSnapshot.version) {
          // If the metadata is before the read schema version, make sure:
          // a) metadata schema is a part of the read schema, i.e. only ADD COLUMN can evolve
          //    metadata schema into read schema
          // b) data type for common fields remain the same
          // c) metadata schema should not contain field that is nullable=true but the read schema
          //    is nullable=false.
          SchemaUtils.isReadCompatible(
            existingSchema = metadata.schema,
            readSchema = readSchemaSnapshot.schema,
            forbidTightenNullability = true)
        } else {
          // If the metadata is POST the read schema version, which can happen due to time-travel
          // or simply a divergence between analyzed version and the actual latest
          // version during scan, we will make sure the other way around:
          // a) the metadata must be a super set of the read schema, i.e. only ADD COLUMN can
          //    evolve read schema into metadata schema
          // b) data type for common fields remain the same
          // c) read schema should not contain field that is nullable=false but the metadata
          //    schema has nullable=true.
          SchemaUtils.isReadCompatible(
            existingSchema = readSchemaSnapshot.schema,
            readSchema = metadata.schema,
            forbidTightenNullability = false)
        }
      }

      if (!compatible) {
        throw DeltaErrors.blockBatchCdfReadWithIncompatibleSchemaChange(
          start, end,
          // The consistent read schema
          readSchemaSnapshot.metadata.schema, readSchemaSnapshot.version,
          // The conflicting schema or schema change version
          metadataVer,
          isSchemaChange
        )
      }
    }
  }

  def shouldCheckSchemaToBlockBatchRead(
      spark: SparkSession,
      deltaLog: DeltaLog,
      isStreaming: Boolean): Boolean = {
    // Check schema read-compatibility
    val allowUnsafeBatchReadOnIncompatibleSchemaChanges =
      spark.sessionState.conf.getConf(
        DeltaSQLConf.DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES)

    if (allowUnsafeBatchReadOnIncompatibleSchemaChanges) {
      recordDeltaEvent(deltaLog, "delta.unsafe.cdf.readOnColumnMappingSchemaChanges")
    }

    !isStreaming && !allowUnsafeBatchReadOnIncompatibleSchemaChanges
  }

  def getConstructedCDCRelation(
      snapshotWithSchemaMode: SnapshotWithSchemaMode,
      sqlContext: SQLContext,
      catalogTableOpt: Option[CatalogTable],
      startingVersion: Option[Long],
      endingVersion: Option[Long]): BaseRelation

  /**
   * Builds a map from commit versions to associated commit timestamps where the timestamp
   * is the modification time of the commit file. Note that this function will not return
   * InCommitTimestamps, it is up to the consumer of this function to decide whether the
   * file modification time is the correct commit timestamp or whether they need to read the ICT.
   *
   * @param start  start commit version
   * @param end  end commit version (inclusive)
   */
  def getNonICTTimestampsByVersion(
      deltaLog: DeltaLog,
      start: Long,
      end: Long): Map[Long, Timestamp] = {
    // Correct timestamp values are only available through DeltaHistoryManager.getCommits(). Commit
    // info timestamps are wrong, and file modification times are wrong because they need to be
    // monotonized first. This just performs a list (we don't read the contents of the files in
    // getCommits()) so the performance overhead is minimal.
    val monotonizationStart =
      math.max(start - DeltaHistoryManager.POTENTIALLY_UNMONOTONIZED_TIMESTAMPS, 0)
    val commits = DeltaHistoryManager.getCommitsWithNonIctTimestamps(
      deltaLog.store,
      deltaLog.logPath,
      monotonizationStart,
      Some(end + 1),
      deltaLog.newDeltaHadoopConf())

    // Note that the timestamps come from filesystem modification timestamps, so they're
    // milliseconds since epoch and we don't need to deal with timezones.
    commits.map(f => (f.version -> new Timestamp(f.timestamp))).toMap
  }

  /**
   * Represents the changes between some start and end version of a Delta table
   * @param fileChangeDf contains all of the file changes (AddFile, RemoveFile, AddCDCFile)
   * @param numFiles the number of AddFile + RemoveFile + AddCDCFiles that are in the df
   * @param numBytes the total size of the AddFile + RemoveFile + AddCDCFiles that are in the df
   */
  case class CDCVersionDiffInfo(fileChangeDf: DataFrame, numFiles: Long, numBytes: Long)
}
